/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*******************************************************************************
 * $Id$
 *
 *******************************************************************************/

/*
 * we_brmupdater.cpp
 *
 *  Created on: Dec 13, 2011
 *      Author: bpaul
 */

#include <iostream>
#include <sstream>
using namespace std;

#include "we_define.h"

#include <boost/lexical_cast.hpp>
using namespace boost;

//#include "we_simplesyslog.h"
#include "we_sdhandler.h"

#include "brm.h"
#include "brmtypes.h"
using namespace BRM;

#include "we_brmupdater.h"

namespace WriteEngine
{
//-----------------------------------------------------------------------------

bool WEBrmUpdater::updateCasualPartitionAndHighWaterMarkInBRM()
{
  try
  {
    bool aGood = prepareCasualPartitionInfo();

    if (!aGood)
      cout << "prepareCasualPartitionInfo Failed" << endl;
  }
  catch (std::exception& ex)
  {
    std::string aStr = "Exception in prepareCasualPartitionInfo(); Error Ignored";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(aStr);
    errMsgArgs.add(ex.what());
    fRef.sysLog(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
    cout << aStr << endl;
  }

  try
  {
    bool aSuccess = prepareHighWaterMarkInfo();

    if (!aSuccess)
      throw(std::runtime_error("prepareHWMInfo Failed"));
  }
  catch (std::exception& ex)
  {
    std::string aStr = "prepareHWMInfo() failed... Bailing out!!";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(aStr);
    aStr = "Need to rollback bulk upload";
    errMsgArgs.add(aStr);
    errMsgArgs.add(ex.what());
    fRef.sysLog(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
    cout << aStr << endl;

    return false;
  }

  // If we are here, we packaged informations properly
  // Lets connect to BRM

  if (!createBrmConnection())
  {
    cout << "Brm Connection FAILED" << endl;
    return false;
  }

  int aRc = updateCPAndHWMInBRM();

  if (aRc != 0)
  {
    cout << "Updating High Water Mark Failed" << endl;
  }
  else
  {
    if (fRef.getDebugLvl())
      cout << "Updating High Water Mark Successful!!" << endl << endl;
  }

  /*
  int cpRc = updateCasualPartitionInBRM();
  if(cpRc != 0)
      cout << "Updating Casual Partition Failed" << endl;
  else
  {
      if(fRef.getDebugLvl())
              cout << "Updating Casual Partition Successful" << endl;
  }

  int hwmRc = updateHighWaterMarkInBRM();
  if(hwmRc != 0)
      cout << "Updating High Water Mark Failed" << endl;
  else
  {
      if(fRef.getDebugLvl())
              cout << "Updating High Water Mark Successful!!" << endl << endl;
  }
  */

  releaseBrmConnection();

  // return(!hwmRc)? true: false;
  return (!aRc) ? true : false;
}

//------------------------------------------------------------------------------
// Update Casual Partition information in BRM
//------------------------------------------------------------------------------
int WEBrmUpdater::updateCasualPartitionInBRM()
{
  int rc = 0;

  if (fCPInfo.size() > 0)
  {
    // std::ostringstream oss;
    // oss << "Committing " << fCPInfo.size() << " CP updates for table " <<
    //		fRef.getTableName() << " to BRM";
    // cout << endl << oss.str() << endl;

    // TODO - NOTE. later make this Objection creation once for both CP & HWM
    if (fpBrm)
    {
      rc = fpBrm->mergeExtentsMaxMin(fCPInfo);

      if (rc != BRM::ERR_OK)
      {
        std::string errStr;
        BRM::errString(rc, errStr);
        cout << "BRM ERROR is ***** " << errStr << endl;

        std::ostringstream oss;
        oss << "Error updating BRM with CP data for table " << fRef.getTableName() << " Error: " << errStr
            << endl;

        cout << endl << oss.str() << endl;
      }
    }
  }

  return rc;
}

//------------------------------------------------------------------------------
// Send HWM update information to BRM
//------------------------------------------------------------------------------
int WEBrmUpdater::updateHighWaterMarkInBRM()
{
  int rc = 0;

  if (fHWMInfo.size() > 0)
  {
    // std::ostringstream oss;
    // oss << "Committing " << fHWMInfo.size() << " HWM update(s) for table "<<
    //    fRef.getTableName() << " to BRM";
    // cout << endl << oss.str() << endl;

    if (fpBrm)
    {
      rc = fpBrm->bulkSetHWM(fHWMInfo, 0);

      if (rc != BRM::ERR_OK)
      {
        std::string errStr;
        BRM::errString(rc, errStr);
        cout << "BRM ERROR is ***** " << errStr << endl;

        std::ostringstream oss;
        oss << "Error updating BRM with HWM data for table " << fRef.getTableName() << "error: " << errStr
            << endl;
        cout << endl << oss.str() << endl;
        return rc;
      }
    }
  }

  return rc;
}

//-----------------------------------------------------------------------------

int WEBrmUpdater::updateCPAndHWMInBRM()
{
  int rc = 0;
  size_t i;

  // BUG 4232. some imports may not contain CP but HWM
  if ((fCPInfo.size() > 0) || (fHWMInfo.size() > 0))
  {
    // TODO - NOTE. later make this Objection creation once for both CP & HWM
    if (fpBrm)
    {
      /*
      rc = bulkSetHWMAndCP(const std::vector<BulkSetHWMArg> &,
                                          const std::vector<CPInfo> & setCPDataArgs,
                                          const std::vector<CPInfoMerge> & mergeCPDataArgs,
                                          VER_t transID = 0) DBRM_THROW;
      */
      for (i = 0; i < fCPInfo.size(); i++)
      {
        if (fCPInfo[i].newExtent)
        {
          fCPInfo[i].seqNum = 0;  // to be in sync with DBRM.
        }
      }
      rc = fpBrm->bulkSetHWMAndCP(fHWMInfo, fCPInfoData, fCPInfo, 0);

      // rc = fpBrm->mergeExtentsMaxMin(fCPInfo);
      // rc = fpBrm->bulkSetHWM(fHWMInfo, 0);
      if (rc != BRM::ERR_OK)
      {
        std::string errStr;
        BRM::errString(rc, errStr);
        cout << "BRM ERROR is ***** " << errStr << endl;

        std::ostringstream oss;
        oss << "Error updating BRM with HWM data for table " << fRef.getTableName() << "error: " << errStr
            << endl;
        cout << endl << oss.str() << endl;
        cout << "ERROR: HWM and CP set failed!!" << endl;
        // throw runtime_error("ERROR: bulSetHWMAndCp Failed!!");
        return rc;
      }
    }
    else
      return rc;
  }

  return rc;
}

//------------------------------------------------------------------------------

bool WEBrmUpdater::prepareCasualPartitionInfo()
{
  // cout << "Started prepareCasualPartitionInfo()!!" << endl;
  // CP: 275456 6000000 4776193 -1 0 1
  WESDHandler::StrVec::iterator aIt = fRef.fBrmRptVec.begin();

  while (aIt != fRef.fBrmRptVec.end())
  {
    std::string aEntry = *aIt;

    if ((!aEntry.empty()) && (aEntry.at(0) == 'C'))
    {
      BRM::CPInfoMerge cpInfoMerge;
      const int BUFLEN = 128;
      char aBuff[BUFLEN];
      strncpy(aBuff, aEntry.c_str(), BUFLEN);
      aBuff[BUFLEN - 1] = 0;

      char* pTok = strtok(aBuff, " ");

      if (!pTok)  // ignore the Msg Body
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad Body in CP entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        cpInfoMerge.startLbid = boost::lexical_cast<uint64_t>(pTok);
      else
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad startLbid in CP entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        cpInfoMerge.seqNum = atoi(pTok);
      else
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad seqNUM in CP entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        cpInfoMerge.type = (execplan::CalpontSystemCatalog::ColDataType)atoi(pTok);
      else
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad type in CP entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        cpInfoMerge.colWidth = boost::lexical_cast<int32_t>(pTok);
      else
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad column width in CP entry string"));
      }

      if (datatypes::isWideDecimalType(cpInfoMerge.type, cpInfoMerge.colWidth))
      {
        datatypes::SystemCatalog::TypeAttributesStd tyAttr(cpInfoMerge.colWidth, 0,
                                                           datatypes::INT128MAXPRECISION);

        pTok = strtok(NULL, " ");

        if (pTok)
          cpInfoMerge.bigMax = tyAttr.decimal128FromString(std::string(pTok), NULL);
        else
        {
          // cout << "CP Entry : " << aEntry << endl;
          throw(runtime_error("Bad MAX in CP entry string"));
        }

        pTok = strtok(NULL, " ");

        if (pTok)
          cpInfoMerge.bigMin = tyAttr.decimal128FromString(std::string(pTok), NULL);
        else
        {
          // cout << "CP Entry : " << aEntry << endl;
          throw(runtime_error("Bad MIN in CP entry string"));
        }
      }
      else
      {
        pTok = strtok(NULL, " ");

        if (pTok)
          cpInfoMerge.max = boost::lexical_cast<int64_t>(pTok);
        else
        {
          // cout << "CP Entry : " << aEntry << endl;
          throw(runtime_error("Bad MAX in CP entry string"));
        }

        pTok = strtok(NULL, " ");

        if (pTok)
          cpInfoMerge.min = boost::lexical_cast<int64_t>(pTok);
        else
        {
          // cout << "CP Entry : " << aEntry << endl;
          throw(runtime_error("Bad MIN in CP entry string"));
        }
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        cpInfoMerge.newExtent = (atoi(pTok) != 0);
      else
      {
        // cout << "CP Entry : " << aEntry << endl;
        throw(runtime_error("Bad newExtent in CP entry string"));
      }

      fCPInfo.push_back(cpInfoMerge);
    }

    ++aIt;
  }

  if (fRef.getDebugLvl())
    cout << "Finished prepareCasualPartitionInfo()!!" << endl;

  return true;
}

//-----------------------------------------------------------------------------

bool WEBrmUpdater::prepareHighWaterMarkInfo()
{
  // HWM: 3056 0 0 8191
  WESDHandler::StrVec::iterator aIt = fRef.fBrmRptVec.begin();

  while (aIt != fRef.fBrmRptVec.end())
  {
    std::string aEntry = *aIt;

    if ((!aEntry.empty()) && (aEntry.at(0) == 'H'))
    {
      BRM::BulkSetHWMArg hwmArg;
      const int BUFLEN = 128;
      char aBuff[BUFLEN];
      strncpy(aBuff, aEntry.c_str(), BUFLEN);
      aBuff[BUFLEN - 1] = 0;

      char* pTok = strtok(aBuff, " ");

      if (!pTok)  // ignore the Msg Body
      {
        // cout << "HWM Entry : " << aEntry << endl;
        throw(runtime_error("Bad Body in HWM entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        hwmArg.oid = atoi(pTok);
      else
      {
        // cout << "HWM Entry : " << aEntry << endl;
        throw(runtime_error("Bad OID in HWM entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        hwmArg.partNum = atoi(pTok);
      else
      {
        // cout << "HWM Entry : " << aEntry << endl;
        throw(runtime_error("Bad partNum in HWM entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        hwmArg.segNum = atoi(pTok);
      else
      {
        // cout << "HWM Entry : " << aEntry << endl;
        throw(runtime_error("Bad partNum in HWM entry string"));
      }

      pTok = strtok(NULL, " ");

      if (pTok)
        hwmArg.hwm = atoi(pTok);
      else
      {
        // cout << "HWM Entry : " << aEntry << endl;
        throw(runtime_error("Bad partNum in HWM entry string"));
      }

      fHWMInfo.push_back(hwmArg);
    }

    ++aIt;
  }

  if (fRef.getDebugLvl())
    cout << "prepareHighWaterMarkInfo() finished" << endl;

  return true;
}

//------------------------------------------------------------------------------
//#ROWS: numRowsRead numRowsInserted

bool WEBrmUpdater::prepareRowsInsertedInfo(std::string Entry, int64_t& TotRows, int64_t& InsRows)
{
  bool aFound = false;

  // ROWS: 3 1
  if ((!Entry.empty()) && (Entry.at(0) == 'R'))
  {
    aFound = true;
    const int BUFLEN = 128;
    char aBuff[BUFLEN];
    strncpy(aBuff, Entry.c_str(), BUFLEN);
    aBuff[BUFLEN - 1] = 0;

    char* pTok = strtok(aBuff, " ");

    if (!pTok)  // ignore the Msg Body
    {
      // cout << "ROWS Entry : " << aEntry << endl;
      throw(runtime_error("Bad Body in entry string"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
      TotRows = strtol(pTok, NULL, 10);
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad Tot ROWS entry string"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
      InsRows = strtol(pTok, NULL, 10);
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad inserted ROWS in entry string"));
    }
  }

  return aFound;
}

//------------------------------------------------------------------------------
//#DATA: columnNumber columnType columnName numOutOfRangeValues

bool WEBrmUpdater::prepareColumnOutOfRangeInfo(std::string Entry, int& ColNum,
                                               execplan::CalpontSystemCatalog::ColDataType& ColType,
                                               std::string& ColName, int& OorValues)
{
  bool aFound = false;
  boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
      execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();

  // DATA: 3 1
  if ((!Entry.empty()) && (Entry.at(0) == 'D'))
  {
    aFound = true;
    const int BUFLEN = 128;
    char aBuff[BUFLEN];
    strncpy(aBuff, Entry.c_str(), BUFLEN);
    aBuff[BUFLEN - 1] = 0;

    char* pTok = strtok(aBuff, " ");

    if (!pTok)  // ignore the Msg Body
    {
      // cout << "ROWS Entry : " << aEntry << endl;
      throw(runtime_error("Bad OOR entry info"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      ColNum = atoi(pTok);
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad Oor Column Number entry string"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      ColType = (execplan::CalpontSystemCatalog::ColDataType)atoi(pTok);
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad Oor Column Type entry string"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      uint64_t columnOid = strtol(pTok, NULL, 10);
      execplan::CalpontSystemCatalog::TableColName colname = systemCatalogPtr->colName(columnOid);
      ColName = colname.schema + "." + colname.table + "." + colname.column;
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad Column Name entry string"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      OorValues = atoi(pTok);
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad OorValues entry string"));
    }
  }

  return aFound;
}

//------------------------------------------------------------------------------
//#ERR:  error message file

bool WEBrmUpdater::prepareErrorFileInfo(std::string Entry, std::string& ErrFileName)
{
  bool aFound = false;

  if ((!Entry.empty()) && (Entry.at(0) == 'E'))
  {
    aFound = true;
    const int BUFLEN = 128;
    char aBuff[BUFLEN];
    strncpy(aBuff, Entry.c_str(), BUFLEN);
    aBuff[BUFLEN - 1] = 0;

    char* pTok = strtok(aBuff, " ");

    if (!pTok)  // ignore the Msg Body
    {
      // cout << "ROWS Entry : " << aEntry << endl;
      throw(runtime_error("Bad ERR File entry info"));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      ErrFileName = pTok;
      // int aLen = ErrFileName.length();
      // if(aLen>0) ErrFileName.insert(aLen-1, 1, 0);
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad Error Filename entry string"));
    }
  }

  return aFound;
}

//------------------------------------------------------------------------------
//#BAD:  bad data file, with rejected rows

bool WEBrmUpdater::prepareBadDataFileInfo(std::string Entry, std::string& BadFileName)
{
  bool aFound = false;

  if ((!Entry.empty()) && (Entry.at(0) == 'B'))
  {
    aFound = true;
    const int BUFLEN = 128;
    char aBuff[BUFLEN];
    strncpy(aBuff, Entry.c_str(), BUFLEN);
    aBuff[BUFLEN - 1] = 0;

    char* pTok = strtok(aBuff, " ");

    if (!pTok)  // ignore the Msg Body
    {
      // cout << "ROWS Entry : " << aEntry << endl;
      throw(runtime_error("Bad BAD Filename entry "));
    }

    pTok = strtok(NULL, " ");

    if (pTok)
    {
      BadFileName = pTok;
      // int aLen = BadFileName.length();
      // if(aLen>0) BadFileName.insert(aLen-1, 1, 0);
    }
    else
    {
      // cout << "HWM Entry : " << aEntry << endl;
      throw(runtime_error("Bad BAD Filename entry string"));
    }
  }

  return aFound;
}

//------------------------------------------------------------------------------

} /* namespace WriteEngine */
